自拓展输入流 |
您所在的位置:网站首页 › flink groupid › 自拓展输入流 |
用户可通过编写代码实现从想要的云生态或者开源生态获取数据,作为Flink作业的输入数据。 语法格式1 2 3 4 5 6 7CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" ) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME 关键字 表1 关键字说明参数 是否必选 说明 type 是 数据源类型,"user_defined"表示数据源为用户自定义数据源。 type_class_name 是 用户实现获取源数据的source类名称,注意包含完整包路径。 type_class_parameter 是 用户自定义source类的入参,仅支持一个string类型的参数。 注意事项用户自定义source类需要继承类RichParallelSourceFunction,并指定数据类型为Row例如定义类MySource:public class MySource extends RichParallelSourceFunction{},重点实现其中的open、run和close函数。 依赖pom: org.apache.flink flink-streaming-java_2.11 ${flink.version} provided org.apache.flink flink-core ${flink.version} provided 示例实现每周期产生一条数据(仅包含一个字段,类型为INT,初始值为1,每周期加1),周期时长为60s,通过入参指定。 1 2 3 4 5 6 7 8 9CREATE SOURCE STREAM user_in_data ( count INT ) WITH ( type = "user_defined", type_class_name = "mySourceSink.MySource", type_class_parameter = "60" ) TIMESTAMP BY car_timestamp.rowtime;![]() 自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |